我們在實務上面常常會遇到要定期執行某項事件的時候,這個時候有很多套件或軟體可以達成這件事情,今天要介紹的是 Python 中的 ApScheduler 這個套件,它可以幫我們設置定時任務,並在我們需要的頻率時間上去執行,功能十分齊全。
而我們定時任務的基底有了,便可以將其結合上 FastAPI,讓我們透過 FastAPI 達成後台的功用,可以來管理 ApScheduler 上的排程任務。
ApScheduler 提供了基於日期、時間間隔、Crontab類型的排程任務,且可以動態的增刪排程任務,執行過程中也會幫你保留上次任務的狀態,十分的好用。
pip install apscheduler
直接使用 pip 進行安裝即可。
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(f"time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')} I'm working...")
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(job, "interval", seconds=5)
scheduler.start()
我們用一個簡單的例子來帶大家使用這個套件,首先我們使用的是阻塞性的調度器,在調度器開始時會占用該使用的執行緒,來定期執行它所管轄的工作。
成功執行會像這樣,每隔五秒打印出它正在執行的字樣。
ApScheduler 有四大組成要件,分別為:
觸發器 ( triggers )
觸發器指的是你選擇使用何種觸發方式來執行你的任務,是前面提到的指定日期、時間間隔、crontab 等方式。
任務儲存區 ( job stores )
是任務儲存的地方,儲存前會先序列化,要執行時在反序列化取出。默認是存放在記憶體中,也可以設定儲放在資料庫中。
執行器 ( excutor )
執行器會將要執行的任務安排進入執行緒池或進程池中執行。
調度器 ( scheduler )
調度器屬於整體系統的指揮官,它會安排上面3個組件如何工作,通常開發人員不會直接操控到上面3個組件,而是透過調度器來達成想要的目的。
ApScheduler 有兩種新增任務的方法:
呼叫 add_job()
scheduler.add_job(job, "interval", seconds=5)
這個方法會回傳一個 job 物件,可以使用這個物件來更新或刪除你所新增的任務。
使用 [scheduled_job()](https://apscheduler.readthedocs.io/en/3.x/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler.scheduled_job)
裝飾器
@scheduler.scheduled_job(trigger="interval", seconds=5)
def job():
print(f"time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')} I'm working...")
當調度器啟動時該任務就會被自動加入調度器中。
job = scheduler.add_job(job, "interval", seconds=5)
job.remove()
我們剛剛有提到 add_job()
時會回傳 job 物件,我們可以使用該物件的 remove 方法來刪除任務。
更細節的部份就請大家去查閱官方文件了,我們接著介紹怎麼與 fastapi 集成。
我們要讓 FastAPI 可以去管理 ApScheduler 的工作,實作完後呼叫 API 便可以查看目前有甚麼工作在進行,新增刪除排程工作。
from fastapi import FastAPI, status
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime
from pytz import utc
app = FastAPI()
scheduler = None
def job(id: str = "No id"):
print(
f"time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')} Work: {id} I'm working..."
)
@app.on_event("startup")
def init_scheduler():
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite"),
}
executors = {
"default": ThreadPoolExecutor(20),
"processpool": ProcessPoolExecutor(5),
}
job_defaults = {"coalesce": False, "max_instances": 3}
global scheduler
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=utc,
)
scheduler.start()
@app.on_event("shutdown")
def shutdown_scheduler():
scheduler.shutdown()
我們寫一個 init_scheduler
的函式,在這個函式裡面我們設定我們調度器所有的參數,並且初始化它。我們在這個函式上用裝飾器讓他可以在 FastAPI 初始化時一併執行。
@app.get("/jobs")
def get_jobs():
jobs = scheduler.get_jobs()
return [{"id": job.id, "next_run_time": job.next_run_time} for job in jobs]
@app.post("/jobs", status_code=status.HTTP_201_CREATED)
def add_job(id: str):
scheduler.add_job(job, "interval", seconds=5, id=id, args=[id])
return {"message": "job added"}
@app.delete("/jobs/{id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_job(id: str):
scheduler.remove_job(id)
router 的部分就是根據我們的需求:
成功後可以看到我們新增完一個工作後,排程任務有確實執行,刪除後排程任務也照常停止運作。
有排程工作時,使用 GET 可以拿取到這些我們所需要的資料來加以利用。
在今天的介紹中,FastAPI 比較像是配角的部分,提供一個讓使用者可以操作 ApScheduler 的接口,而不用直接操作程式來修改排程任務,當然也可以作更多的應用這裡就不一一介紹了,留給各位朋友們去發掘。
apScheduler和fastapi交互_fastapi 触发器-CSDN博客
User guide — APScheduler 3.10.4.post1 documentation